package cn.mj.consumer;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import cn.mj.entity.Dept;
import cn.mj.entity.User;

@Service
public class Object1Consumer {

	@Value("${zk_lock_path}")
	private String zkLockPath;
	
	@Autowired
	private CuratorFramework curatorFramework;
	
	@JmsListener(destination="object_all")
	public void receiveObject(Object object) {
		ActiveMQObjectMessage msg = (ActiveMQObjectMessage)object;
		byte[] data = msg.getContent().getData();
		 ObjectInputStream ois;
		try {
			ois = new ObjectInputStream(new ByteArrayInputStream(data));
			object = ois.readObject(); 
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		if (object instanceof User) {
//			System.out.println("消费者1===user：" + object.getClass());
//			System.out.println("消费者1===user：" + (User)object);
			InterProcessMutex mutex = null;
			try {
				User user = (User)object;
				CuratorFrameworkState state = curatorFramework.getState();
				if (CuratorFrameworkState.STOPPED.equals(state)) {
					System.out.println("已经关闭了于zookeeper的连接，就再重新连接。。。。。。");
					curatorFramework.start();
				}
				mutex = new InterProcessMutex(curatorFramework, zkLockPath + "/" + user.getId());
				System.err.println("消费者1===user"+user.getId()+"<<开始>>被消费");
				mutex.acquire();
				System.err.println("消费者1===user"+user.getId()+"<<已经>>被消费");
			} catch (Exception e) {
				e.printStackTrace();
			}finally {
				if (mutex != null) {
					try {
						mutex.release();
					} catch (Exception e) {
						e.printStackTrace();
						mutex = null;
					}
				}
			}
		}else if (object instanceof Dept) {
			InterProcessMutex mutex = null;
			try {
//				System.out.println("消费者1===dept：" + object.getClass());
//				System.out.println("消费者1===dept：" + (Dept)object);
				Dept dept = (Dept)object;
				//如果是dept，那么就必须要等到相同ID的user消费了才能消费dept
				CuratorFrameworkState state = curatorFramework.getState();
				if (CuratorFrameworkState.STOPPED.equals(state)) {
					System.out.println("已经关闭了于zookeeper的连接，就再重新连接。。。。。。");
					curatorFramework.start();
				}
				mutex = new InterProcessMutex(curatorFramework, zkLockPath + "/" + dept.getId());
				System.err.println("消费者1===dept"+dept.getId()+"<<开始>>被消费");
				mutex.acquire();
				System.err.println("消费者1===dept"+dept.getId()+"<<已经>>被消费");
			} catch (Exception e) {
				e.printStackTrace();
			}finally {
				if (mutex != null) {
					try {
						mutex.release();
					} catch (Exception e) {
						e.printStackTrace();
						mutex = null;
					}
				}
			}
		}
		
	}
	
}
